package consul

import (
	"epg/jmconf"
	"epg/utils"

	"github.com/hashicorp/consul/api"
	"github.com/hashicorp/consul/watch"
	"github.com/zeast/logs"
)

//CWatch consul watch the key and prefix.
type CWatch struct {
	addr  string
	dc    string
	path  string
	token string
	wk    *watch.WatchPlan // watch key
	wp    *watch.WatchPlan // watch key prefix
}

var _ jmconf.Watcher = &CWatch{}

//NewCWatch create a new consul watch.
func NewCWatch(cfg *jmconf.DisCfg) *CWatch {
	watch := &CWatch{
		addr:  cfg.Addrs[0],
		dc:    cfg.DataCenter,
		path:  cfg.Path,
		token: cfg.Token,
	}
	return watch
}

//WatchKey watch the consul key.
func (cw *CWatch) WatchKey(key string, ops chan jmconf.Opt) error {
	wp, err := watch.Parse(map[string]interface{}{
		"type":       "key",
		"datacenter": cw.dc,
		"token":      cw.token,
		"key":        cw.path + key,
	})
	if err != nil {
		logs.Errorf("watch key 失败. %s", err)
		return err
	}

	//handle watch data function
	var _pair = &api.KVPair{}
	wp.Handler = func(idx uint64, raw interface{}) {
		if raw == nil {
			return
		}
		pair, ok := raw.(*api.KVPair)
		if !ok || pair == nil {
			logs.Error("watch 到未知的信息, %t, %s", ok, pair)
			return
		}
		// log.Debugf("idx: %d, key: %s, val:\n%+v \n", idx, pair.Key, string(pair.Value))
		//if !bytes.Equal(_pair.Value, pair.Value) {
		ops <- jmconf.Opt{Op: jmconf.OPModify, KV: jmconf.KV{Key: key, Val: pair.Value, LastIdx: pair.ModifyIndex}}
		//}
		_pair = pair
	}

	cw.wk = wp
	go func() {
		defer utils.PrintPanicStack()
		err := cw.wk.Run(cw.addr)
		if err != nil {
			logs.Error("watch 失败, %s", err)
			return
		}
	}()
	return nil
}

//WatchPrefix watch the prefix consul key.
func (cw *CWatch) WatchPrefix(prefix string, ops chan []jmconf.Opt) error {
	wp, err := watch.Parse(map[string]interface{}{
		"type":       "keyprefix",
		"datacenter": cw.dc,
		"token":      cw.token,
		"prefix":     cw.path + prefix,
	})
	if err != nil {
		logs.Errorf("watch prefix 失败. %s", err)
		return err
	}
	//handle function
	var _pairs = api.KVPairs{}
	wp.Handler = func(idx uint64, raw interface{}) {
		if raw == nil {
			return
		}
		pairs, ok := raw.(api.KVPairs)
		if !ok || pairs == nil {
			logs.Error("watch 到未知的信息, %t, %+v", ok, pairs)
			return
		}

		newkv := make([]jmconf.Opt, 0, len(pairs))
		for _, v := range diff(_pairs, pairs) {
			newkv = append(newkv, v)
		}
		select {
		case ops <- newkv:
		default:
			logs.Error("接收 consul 信息的管道已满")
		}
		_pairs = pairs
	}

	cw.wp = wp
	go func() {
		defer utils.PrintPanicStack()
		err := cw.wp.Run(cw.addr)
		if err != nil {
			logs.Error("watch 失败, %s", err)
			return
		}
	}()
	return nil
}

//Close close the ConsulWatch.
func (cw *CWatch) Close() error {
	if cw.wk != nil {
		cw.wk.Stop()
		cw.wk = nil
	}
	if cw.wp != nil {
		cw.wp.Stop()
		cw.wp = nil
	}
	return nil
}
